// Copyright 2022-2023 Tigris Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package database

import (
	"context"

	jsoniter "github.com/json-iterator/go"
	"github.com/rs/zerolog/log"
	api "github.com/tigrisdata/tigris/api/server/v1"
	"github.com/tigrisdata/tigris/errors"
	"github.com/tigrisdata/tigris/schema"
	cschema "github.com/tigrisdata/tigris/schema/lang"
	"github.com/tigrisdata/tigris/server/metadata"
	"github.com/tigrisdata/tigris/server/metrics"
	"github.com/tigrisdata/tigris/server/transaction"
	"github.com/tigrisdata/tigris/store/kv"
	ulog "github.com/tigrisdata/tigris/util/log"
)

type ImportQueryRunner struct {
	*BaseQueryRunner

	req          *api.ImportRequest
	queryMetrics *metrics.WriteQueryMetrics
}

func (runner *ImportQueryRunner) evolveSchema(ctx context.Context, tenant *metadata.Tenant, rawSchema []byte) error {
	var sch cschema.Schema
	req := runner.req

	if rawSchema != nil {
		err := jsoniter.Unmarshal(rawSchema, &sch)
		if ulog.E(err) {
			return err
		}
	}

	err := schema.Infer(&sch, req.GetCollection(), req.GetDocuments(), req.GetPrimaryKey(), req.GetAutogenerated(), len(req.GetDocuments()))
	if err != nil {
		return err
	}

	b, err := jsoniter.Marshal(&sch)
	if ulog.E(err) {
		return err
	}

	log.Debug().Str("from", string(rawSchema)).Str("to", string(b)).Msg("evolving schema on import")

	schFactory, err := schema.NewFactoryBuilder(true).Build(req.GetCollection(), b)
	if err != nil {
		return err
	}

	// Update collection schema in its own transaction
	tx, err := runner.txMgr.StartTx(ctx)
	if err != nil {
		return err
	}
	defer func() { _ = tx.Rollback(ctx) }()

	db, err := runner.getDatabase(ctx, tx, tenant, req.GetProject(), "")
	if err != nil {
		return err
	}

	err = tenant.CreateCollection(ctx, tx, db, schFactory)
	if err == kv.ErrDuplicateKey {
		// this simply means, concurrently CreateCollection is called,
		return errors.Aborted("concurrent createReq collection request, aborting")
	}

	if err != nil {
		return err
	}

	return tx.Commit(ctx)
}

func (runner *ImportQueryRunner) Run(ctx context.Context, tx transaction.Tx, tenant *metadata.Tenant) (Response, context.Context, error) {
	db, coll, err := runner.getDBAndCollection(ctx, tx, tenant,
		runner.req.GetProject(), runner.req.GetCollection(), runner.req.GetBranch())

	//FIXME: errors.As(err, &ep) doesn't work
	//nolint:errorlint
	ep, ok := err.(*api.TigrisError)
	if err != nil && (!ok || ep.Code != api.Code_NOT_FOUND || !runner.req.CreateCollection) {
		return Response{}, ctx, err
	}
	if err != nil {
		// api.Code_NOT_FOUND && runner.req.CreateCollection
		// Infer schema and createReq collection from the first batch of documents
		if err := runner.evolveSchema(ctx, tenant, nil); err != nil {
			return Response{}, ctx, err
		}

		db, coll, err = runner.getDBAndCollection(ctx, tx, tenant,
			runner.req.GetProject(), runner.req.GetCollection(), runner.req.GetBranch())
		if err != nil {
			return Response{}, ctx, err
		}
	}

	ctx = runner.cdcMgr.WrapContext(ctx, db.Name())

	if err = runner.mustBeDocumentsCollection(coll, "insert"); err != nil {
		return Response{}, ctx, err
	}

	ts, allKeys, err := runner.insertOrReplace(ctx, tx, tenant, coll, runner.req.GetDocuments(), true)
	if err != nil {
		if err == kv.ErrDuplicateKey {
			return Response{}, ctx, errors.AlreadyExists(err.Error())
		}

		ep, ok = err.(*api.TigrisError)
		if !ok || ep.Code != api.Code_INVALID_ARGUMENT {
			return Response{}, ctx, err
		}

		// Rollback original transaction, where partial batch insert might be succeeded.
		ulog.E(tx.Rollback(ctx))

		// Failed to insert due to schema change. Infer and update the schema.
		if err := runner.evolveSchema(ctx, tenant, coll.Schema); err != nil {
			return Response{}, ctx, err
		}

		// Retry insert after schema update in its own transaction
		tx, err := runner.txMgr.StartTx(ctx)
		if err != nil {
			return Response{}, ctx, err
		}
		defer func() { _ = tx.Rollback(ctx) }()

		// Retry insert after updating the schema
		ts, allKeys, err = runner.insertOrReplace(ctx, tx, tenant, coll, runner.req.GetDocuments(), true)
		if err == kv.ErrDuplicateKey {
			return Response{}, ctx, errors.AlreadyExists(err.(kv.StoreError).Msg())
		}

		if err != nil {
			return Response{}, ctx, err
		}

		if err = tx.Commit(ctx); err != nil {
			return Response{}, ctx, err
		}
	}

	runner.queryMetrics.SetWriteType("import")
	metrics.UpdateSpanTags(ctx, runner.queryMetrics)

	return Response{
		CreatedAt: ts,
		AllKeys:   allKeys,
		Status:    InsertedStatus,
	}, ctx, nil
}
